/* Copyright (C) 2014 InfiniDB, Inc.

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

/*******************************************************************************
* $Id: we_xmljob.cpp 4579 2013-03-19 23:16:54Z dhall $
*
*******************************************************************************/
/** @file */

#define WRITEENGINEXMLJOB_DLLEXPORT
#include "we_xmljob.h"
#undef WRITEENGINEXMLJOB_DLLEXPORT

#include <limits>
#include <sstream>
#include <unistd.h>
#include <stdexcept>
#include <cstdlib>
#include <set>
#include "we_config.h"
#include "we_log.h"
#include "we_convertor.h"
#include "dataconvert.h"
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/filesystem/path.hpp>
#include <boost/filesystem/convenience.hpp>

#include <sys/time.h>

using namespace std;
using namespace execplan;


namespace WriteEngine
{
// Maximum saturation value for DECIMAL types based on precision
const long long infinidb_precision[19] =
{
    0,
    9,
    99,
    999,
    9999,
    99999,
    999999,
    9999999,
    99999999,
    999999999,
    9999999999LL,
    99999999999LL,
    999999999999LL,
    9999999999999LL,
    99999999999999LL,
    999999999999999LL,
    9999999999999999LL,
    99999999999999999LL,
    999999999999999999LL
};

//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
XMLJob::XMLJob( ) : fDebugLevel( DEBUG_0 ),
    fDeleteTempFile(false),
    fValidateColList(true)
{
}

//------------------------------------------------------------------------------
// Default Destructor
// Delete temporary Job XML file if applicable.
//------------------------------------------------------------------------------
XMLJob::~XMLJob()
{
    if ((fDeleteTempFile) && (!fJobFileName.empty()))
    {
        unlink( fJobFileName.c_str() );
    }
}

//------------------------------------------------------------------------------
// Load a job xml file
// fileName - name of file to load
// bTempFile - are we loading a temporary file (that destructor should delete)
// bValidateColumnList - validate that all db columns have an XML tag
// returns NO_ERROR if success; other if fail
//------------------------------------------------------------------------------
int XMLJob::loadJobXmlFile( const string& fileName,
                            bool bTempFile,
                            bool bValidateColumnList,
                            string& errMsg )
{
    int rc;

    fDeleteTempFile = bTempFile;
    fJobFileName    = fileName;
    fValidateColList = bValidateColumnList;

    try
    {
        rc = parseDoc( fileName.c_str() );

        if (rc != NO_ERROR)
            return rc;
    }
    catch (exception& ex)
    {
        errMsg = ex.what();
        return ERR_XML_PARSE;
    }

    return rc;
}

//------------------------------------------------------------------------------
// Print contents of fJob to the specified logger object.
// logger - Log object to use in logging
//------------------------------------------------------------------------------
void XMLJob::printJobInfo( Log& logger ) const
{
    const Job& job = fJob;

    ostringstream oss1;
    oss1 << "Job " << job.id << " input\n";
    oss1 << "===============================================" << endl;
    oss1 << "Name : " << job.name << endl;
    oss1 << "Desc : " << job.desc << endl;
    oss1 << "User : " << job.userName << endl;
    oss1 << "Delim: " << job.fDelimiter << endl;
    oss1 << "Enclosed By : ";

    if (job.fEnclosedByChar)
        oss1 << job.fEnclosedByChar << endl;
    else
        oss1 << "n/a" << endl;

    oss1 << "Escape Char : ";

    if (job.fEscapeChar)
        oss1 << job.fEscapeChar << endl;
    else
        oss1 << "n/a" << endl;

    oss1 << "Read Buffers:     " << job.numberOfReadBuffers << endl;
    oss1 << "Read Buffer Size: " << job.readBufferSize << endl;
    oss1 << "setvbuf Size: " << job.writeBufferSize << endl;
    oss1 << "Create Date : " << job.createDate << endl;
    oss1 << "Create Time : " << job.createTime << endl;
    oss1 << "Schema Name : " << job.schema << endl;

    oss1 << "Num Tables  : " << job.jobTableList.size() << endl;
    logger.logMsg( oss1.str(), MSGLVL_INFO2 );

    for ( unsigned int i = 0; i < job.jobTableList.size(); i++ )
    {
        const JobTable& jobTable = job.jobTableList[i];
        ostringstream oss2;
        oss2 << "\n-------------------------------------------------" << endl;
        oss2 << "\tTable Name      : " << jobTable.tblName << endl;
        oss2 << "\tTable OID       : " << jobTable.mapOid << endl;
        oss2 << "\tTable Load Name : " << jobTable.loadFileName <<
             endl;
        oss2 << "\tMax Err Num     : " << jobTable.maxErrNum << endl;

        const JobColList& colList = jobTable.colList;

        oss2 << "\tNum of Columns  : " << colList.size() << endl;
        logger.logMsg( oss2.str(), MSGLVL_INFO2 );

        // Note that we don't print JobColumn.dataType because it is not carried
        // in the XML file.  dataType is assigned/used internally by bulkload.
        for ( unsigned int j = 0; j < jobTable.fFldRefs.size(); j++ )
        {
            unsigned idx            = jobTable.fFldRefs[j].fArrayIndex;
            BulkFldColRel fldColType = jobTable.fFldRefs[j].fFldColType;
            const JobColumn& jobCol = ((fldColType == BULK_FLDCOL_IGNORE_FIELD) ?
                                       jobTable.fIgnoredFields[idx] :
                                       jobTable.colList[idx] );
            ostringstream oss3;
            oss3 << "\n\t****************************************" << endl;

            if (fldColType == BULK_FLDCOL_COLUMN_DEFAULT)
                oss3 << "\t\tDefaultColumn Name: " << jobCol.colName << endl;
            else
                oss3 << "\t\tColumn Name       : " << jobCol.colName << endl;

            oss3 << "\t\tColumn OID        : " << jobCol.mapOid << endl;
            oss3 << "\t\tColumn type name  : " << jobCol.typeName << endl;
            oss3 << "\t\tColumn width      : " << jobCol.width << endl;
            oss3 << "\t\tColumn Not Null   : " << jobCol.fNotNull << endl;
            oss3 << "\t\tColumn WithDefault: " << jobCol.fWithDefault << endl;
            oss3 << "\t\tColumn type       : " << jobCol.colType << endl;
            oss3 << "\t\tColumn comp type  : " << jobCol.compressionType << endl;
            oss3 << "\t\tColumn autoInc    : " << jobCol.autoIncFlag << endl;

            if ( jobCol.typeName == ColDataTypeStr[CalpontSystemCatalog::DECIMAL] )
            {
                oss3 << "\t\tColumn Precision  : " << jobCol.precision << endl;
                oss3 << "\t\tColumn Scale      : " << jobCol.scale << endl;
            }

            if ( jobCol.typeName == ColDataTypeStr[CalpontSystemCatalog::UDECIMAL] )
            {
                oss3 << "\t\tColumn Precision  : " << jobCol.precision << endl;
                oss3 << "\t\tColumn Scale      : " << jobCol.scale << endl;
            }

            if ( jobCol.colType == 'D' )
            {
                oss3 << "\t\tDictionary Oid    : " <<
                     jobCol.dctnry.dctnryOid << endl;
            }

            logger.logMsg( oss3.str(), MSGLVL_INFO2 );
        } // end of loop through columns in a table
    } // end of loop through tables
}

//------------------------------------------------------------------------------
// Print brief contents of specified Job to specified logger object.
// logger - Log object to use in logging
//------------------------------------------------------------------------------
void XMLJob::printJobInfoBrief( Log& logger ) const
{
    const Job& job = fJob;

    ostringstream oss1;
    oss1 << "XMLJobFile: Delim(" << job.fDelimiter << "); EnclosedBy(";

    if (job.fEnclosedByChar)
        oss1 << job.fEnclosedByChar;
    else
        oss1 << "n/a";

    oss1 << "); EscapeChar(";

    if (job.fEscapeChar)
        oss1 << job.fEscapeChar;
    else
        oss1 << "n/a";

    oss1 << "); ReadBufs("    << job.numberOfReadBuffers <<
         "); ReadBufSize(" << job.readBufferSize      <<
         "); setvbufSize(" << job.writeBufferSize     << ')';
    logger.logMsg( oss1.str(), MSGLVL_INFO2 );

    for ( unsigned int i = 0; i < job.jobTableList.size(); i++ )
    {
        const JobTable& jobTable = job.jobTableList[i];
        ostringstream oss2;
        oss2 << "  Table(" << jobTable.tblName <<
             "); OID("  << jobTable.mapOid  << ')' <<
             "; MaxErrNum(" << jobTable.maxErrNum << ')';
        logger.logMsg( oss2.str(), MSGLVL_INFO2 );

        for ( unsigned int j = 0; j < jobTable.fFldRefs.size(); j++ )
        {
            unsigned idx            = jobTable.fFldRefs[j].fArrayIndex;
            BulkFldColRel fldColType = jobTable.fFldRefs[j].fFldColType;
            const JobColumn& jobCol = ((fldColType == BULK_FLDCOL_IGNORE_FIELD) ?
                                       jobTable.fIgnoredFields[idx] :
                                       jobTable.colList[idx]);
            ostringstream oss3;

            if (fldColType == BULK_FLDCOL_COLUMN_DEFAULT)
                oss3 << "    DefaultColumn(" << jobCol.colName;
            else
                oss3 << "    Column("        << jobCol.colName;

            oss3 << "); OID("     << jobCol.mapOid   <<
                 "); Type("    << jobCol.typeName <<
                 "); Width("   << jobCol.width    <<
                 "); Comp("    << jobCol.compressionType;

            if ( jobCol.colType == 'D' )
                oss3 << "); DctnryOid(" << jobCol.dctnry.dctnryOid;

            oss3 << ')';

            if (jobCol.autoIncFlag)
                oss3 << "; autoInc";

            if (jobCol.fNotNull)
                oss3 << "; NotNull";

            if (jobCol.fWithDefault)
                oss3 << "; WithDefault";

            logger.logMsg( oss3.str(), MSGLVL_INFO2 );
        }
    } // end of for( int i
}

//------------------------------------------------------------------------------
// Process a node
// pNode - current node
// returns TRUE if success, FALSE otherwise
//------------------------------------------------------------------------------
bool XMLJob::processNode( xmlNode* pNode )
{
    if ( isTag( pNode, TAG_BULK_JOB ))
    {
        // no work for the BulkJob tag
    }
    else if ( isTag( pNode, TAG_CREATE_DATE ))
        setJobData( pNode, TAG_CREATE_DATE, true, TYPE_CHAR );
    else if ( isTag( pNode, TAG_CREATE_TIME ))
        setJobData( pNode, TAG_CREATE_TIME, true, TYPE_CHAR );
    else if ( isTag( pNode, TAG_COLUMN ))
        setJobData( pNode, TAG_COLUMN, false, TYPE_EMPTY );
    else if ( isTag( pNode, TAG_DEFAULT_COLUMN ))
        setJobData( pNode, TAG_DEFAULT_COLUMN, false, TYPE_EMPTY );
    else if ( isTag( pNode, TAG_DESC ))
        setJobData( pNode, TAG_DESC, true, TYPE_CHAR );
    else if ( isTag( pNode, TAG_ID ))
        setJobData( pNode, TAG_ID, true, TYPE_INT );
    else if ( isTag( pNode, TAG_IGNORE_FIELD ))
        setJobData( pNode, TAG_IGNORE_FIELD, false, TYPE_EMPTY );
    else if ( isTag( pNode, TAG_NAME ))
        setJobData( pNode, TAG_NAME, true, TYPE_CHAR );
    else if ( isTag( pNode, TAG_PATH ))
        setJobData( pNode, TAG_PATH, true, TYPE_CHAR );
    else if ( isTag( pNode, TAG_TABLE ))
        setJobData( pNode, TAG_TABLE, false, TYPE_EMPTY );
    else if ( isTag( pNode, TAG_TYPE ))
        setJobData( pNode, TAG_TYPE, true, TYPE_CHAR );
    else if ( isTag( pNode, TAG_USER ))
        setJobData( pNode, TAG_USER, true, TYPE_CHAR );
    else if ( isTag( pNode, TAG_SCHEMA))
        setJobData( pNode, TAG_SCHEMA, false, TYPE_EMPTY );
    else if ( isTag( pNode, TAG_READ_BUFFERS))
        setJobData( pNode, TAG_READ_BUFFERS, false, TYPE_EMPTY );
    else if ( isTag( pNode, TAG_WRITE_BUFFER_SIZE))
        setJobData( pNode, TAG_WRITE_BUFFER_SIZE, true, TYPE_INT);
    else if ( isTag( pNode, TAG_DELIMITER))
        setJobData( pNode, TAG_DELIMITER, true, TYPE_CHAR);
    else if ( isTag( pNode, TAG_ENCLOSED_BY_CHAR))
        setJobData( pNode, TAG_ENCLOSED_BY_CHAR, true, TYPE_CHAR);
    else if ( isTag( pNode, TAG_ESCAPE_CHAR))
        setJobData( pNode, TAG_ESCAPE_CHAR, true, TYPE_CHAR);
    else
    {
        ostringstream oss;
        oss << "Unrecognized TAG in Job XML file: <" << pNode->name << ">";
        throw runtime_error( oss.str() );
    }

    if (XMLOp::processNode( pNode ))
    {
        if ( isTag( pNode, TAG_TABLE ))
        {
            postProcessTableNode();
        }
    }
    else
    {
        return false;
    }

    return true;
}

//------------------------------------------------------------------------------
// Generic setter
// pNode - current node
// tag - xml tag
// bExpectContent - should node content be present to process
// tagType - data type
//------------------------------------------------------------------------------
void XMLJob::setJobData( xmlNode* pNode,
                         const xmlTag tag,
                         bool  bExpectContent,
                         XML_DTYPE tagType )
{
    int         intVal = 0;
    long long   llVal = 0;
    std::string bufString;
    bool        bSuccess = false;

    if (bExpectContent)
    {
        if ( tagType == TYPE_INT )
            bSuccess = getNodeContent( pNode, &intVal, TYPE_INT );
        else // longlong
            if ( tagType == TYPE_LONGLONG )
                bSuccess = getNodeContent( pNode, &llVal, TYPE_LONGLONG );
            else // char
                if ( tagType == TYPE_CHAR )
                    bSuccess = getNodeContentStr( pNode, bufString );

        if (!bSuccess)
            return;
    }

    // process tag content and attributes
    switch ( tag )
    {
        case  TAG_READ_BUFFERS:
            setReadBuffers( pNode );
            break;

        case  TAG_COLUMN:
            setJobDataColumn( pNode, false );
            break;

        case  TAG_CREATE_DATE:
            fJob.createDate = bufString;
            break;

        case  TAG_CREATE_TIME:
            fJob.createTime = bufString;
            break;

        case  TAG_DEFAULT_COLUMN:
            setJobDataColumn( pNode, true );
            break;

        case  TAG_DESC:
            fJob.desc = bufString;
            break;

        case  TAG_ID:
            fJob.id = intVal;
            break;

        case  TAG_IGNORE_FIELD:
            setJobDataIgnoreField( );
            break;

        case  TAG_NAME:
            fJob.name = bufString;
            break;

        case  TAG_PATH:
            // no action necessary, but keep for backwards compatability
            break;

        case  TAG_TABLE:
            setJobDataTable( pNode );
            break;

        case  TAG_TYPE:
            // no action necessary, but keep for backwards compatability
            break;

        case  TAG_USER:
            fJob.userName = bufString;
            break;

        case  TAG_SCHEMA:
            setSchema( pNode );
            break;

        case TAG_WRITE_BUFFER_SIZE:
            fJob.writeBufferSize  = intVal;
            break;

        case TAG_DELIMITER:
        {
            const char* buf = bufString.c_str();

            if ((!strcmp(buf, "\\t")) ||
                    (!strcmp(buf, "'\\t'")))
            {
                fJob.fDelimiter = '\t';
            }
            else
            {
                fJob.fDelimiter = bufString[0];
            }

            break;
        }

        case TAG_ENCLOSED_BY_CHAR:
        {
            fJob.fEnclosedByChar = bufString[0];
            break;
        }

        case TAG_ESCAPE_CHAR:
        {
            fJob.fEscapeChar = bufString[0];
            break;
        }

        default:
            break;
    }
}

//------------------------------------------------------------------------------
// Set table information parms.
// pNode - current node
//------------------------------------------------------------------------------
void XMLJob::setJobDataTable( xmlNode* pNode )
{
    int         intVal;
    std::string bufString;
    JobTable    curTable;

    if ( getNodeAttributeStr( pNode, xmlTagTable[TAG_ORIG_NAME], bufString ) )
        curTable.tblName = bufString;

    if ( getNodeAttributeStr( pNode, xmlTagTable[TAG_TBL_NAME], bufString ) )
        curTable.tblName = bufString;

    if (curTable.tblName.empty())
    {
        throw runtime_error(
            "Required table name attribute (tblName) missing from Table tag");
    }

    if ( getNodeAttribute( pNode, xmlTagTable[TAG_TBL_OID], &intVal, TYPE_INT ) )
        curTable.mapOid = intVal;

    if ( getNodeAttributeStr( pNode, xmlTagTable[TAG_LOAD_NAME], bufString ) )
        curTable.loadFileName = bufString;

    if ( getNodeAttribute( pNode, xmlTagTable[TAG_MAX_ERR_ROW], &intVal,
                           TYPE_INT))
        curTable.maxErrNum = intVal;

    fJob.jobTableList.push_back( curTable );
}

//------------------------------------------------------------------------------
// Set column information parms.
// pNode - current node
// bDefaultCol - is this a <DefaultColumn> tag
//
// Note on Supported Tags: (Bug 2828)
// Note that the "notnull" and "defaultValue" attribute tags are not recognized
// by this function because by the time we added support for these tags, we had
// changed to only store the table and column names in the XML file.  Much of
// the functionality in setJobDataColumn() is only present to provide backwards
// compatability for an old Job XML file that a user might still be using.
//
// Any other new tags probably don't need adding to setJobDataColumn() either,
// for the same reason.
//------------------------------------------------------------------------------
void XMLJob::setJobDataColumn( xmlNode* pNode, bool bDefaultCol )
{
    int         intVal;
    std::string bufString;
    JobColumn   curColumn;

    if ( fJob.jobTableList.size() == 0 )
        return;

    int tableNo = fJob.jobTableList.size() - 1;

    if ( getNodeAttributeStr( pNode, xmlTagTable[TAG_ORIG_NAME], bufString ) )
        curColumn.colName = bufString;

    if ( getNodeAttributeStr( pNode, xmlTagTable[TAG_COL_NAME], bufString ) )
        curColumn.colName = bufString;

    if (curColumn.colName.empty())
    {
        ostringstream oss;
        oss << "Required column name attribute (colName) missing from "
            "Column tag for table " <<
            fJob.jobTableList[tableNo].tblName;
        throw runtime_error( oss.str() );
    }

    if ( getNodeAttribute( pNode, xmlTagTable[TAG_COL_OID], &intVal, TYPE_INT ) )
        curColumn.mapOid = intVal;

    if ( getNodeAttribute( pNode, xmlTagTable[TAG_WIDTH], &intVal, TYPE_INT ) )
    {
        curColumn.width = intVal;
        curColumn.definedWidth = intVal; //@Bug 3040
    }

    if ( getNodeAttribute( pNode, xmlTagTable[TAG_PRECISION], &intVal, TYPE_INT))
        curColumn.precision = intVal;

    if ( getNodeAttribute( pNode, xmlTagTable[TAG_SCALE], &intVal, TYPE_INT ) )
        curColumn.scale = intVal;

    if ( getNodeAttributeStr( pNode, xmlTagTable[TAG_DATA_TYPE], bufString ) )
        curColumn.typeName = bufString;

    if ( getNodeAttribute( pNode, xmlTagTable[TAG_COMPRESS_TYPE], &intVal,
                           TYPE_INT))
    {
        curColumn.compressionType = intVal;
        curColumn.dctnry.fCompressionType = intVal;
    }

    if ( getNodeAttribute( pNode, xmlTagTable[TAG_AUTOINCREMENT_FLAG],
                           &intVal, TYPE_INT))
    {
        if (intVal)
            curColumn.autoIncFlag = true;
        else
            curColumn.autoIncFlag = false;
    }

    if ( getNodeAttributeStr( pNode, xmlTagTable[TAG_COL_TYPE], bufString ) )
    {
        const char* buf = bufString.c_str();

        if ( !strcmp( buf, "D" ) )
        {
            curColumn.colType = 'D';

            // @Bug 2565: Retain dictionary width to use in truncating strings,
            // since BulkLoad eventually stores column token width in 'width'.
            curColumn.dctnryWidth = curColumn.width;

            if ( getNodeAttribute( pNode,
                                   xmlTagTable[TAG_DVAL_OID],
                                   &intVal,
                                   TYPE_INT ) )
                curColumn.dctnry.dctnryOid = intVal;
        }
    }

    // This is a workaround that DBBuilder can not pass decimal type to XML file
    if ( ( curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::INT] ||
            curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::BIGINT] ||
            curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::SMALLINT] ||
            curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::TINYINT]) &&
            curColumn.scale > 0 )
        curColumn.typeName = ColDataTypeStr[CalpontSystemCatalog::DECIMAL];

    // end of workaround

    // Initialize the saturation limits for this column
    initSatLimits( curColumn );

    // Save default columns in separate list, so that we can intentionally
    // add/keep them at the "end" of colList later, after all other columns.
    if (bDefaultCol) // temporarily save in separate list
    {
        curColumn.fFldColRelation = BULK_FLDCOL_COLUMN_DEFAULT;
        fDefaultColumns.push_back ( curColumn );
    }
    else
    {
        // Add to list of db columns to be loaded
        curColumn.fFldColRelation = BULK_FLDCOL_COLUMN_FIELD;
        fJob.jobTableList[tableNo].colList.push_back ( curColumn );

        // Add to combined field list of columns and ignored fields
        JobFieldRef fieldRef( BULK_FLDCOL_COLUMN_FIELD,
                              fJob.jobTableList[tableNo].colList.size() - 1 );
        fJob.jobTableList[tableNo].fFldRefs.push_back( fieldRef  );
    }
}

//------------------------------------------------------------------------------
// Set column information parms for an input field that is to be ignored
//------------------------------------------------------------------------------
void XMLJob::setJobDataIgnoreField( )
{
    JobColumn curColumn;

    int tableNo = fJob.jobTableList.size() - 1;
    ostringstream oss;
    oss << "IgnoreField" << fJob.jobTableList[tableNo].fFldRefs.size() + 1;
    curColumn.colName     = oss.str();

    // Add to list of ignored fields
    curColumn.fFldColRelation = BULK_FLDCOL_IGNORE_FIELD;
    fJob.jobTableList[tableNo].fIgnoredFields.push_back( curColumn );

    // Add to combined field list of columns and ignored fields
    JobFieldRef fieldRef( BULK_FLDCOL_IGNORE_FIELD,
                          fJob.jobTableList[tableNo].fIgnoredFields.size() - 1 );
    fJob.jobTableList[tableNo].fFldRefs.push_back      ( fieldRef  );
}

//------------------------------------------------------------------------------
// Initialize the saturation limits for the specified column.
//------------------------------------------------------------------------------
void XMLJob::initSatLimits( JobColumn& curColumn ) const
{
    // If one of the integer types, we set the min/max saturation value.
    // For DECIMAL columns this will vary with the precision.
    if      ( curColumn.typeName ==
              ColDataTypeStr[CalpontSystemCatalog::INT] )
    {
        curColumn.fMinIntSat = MIN_INT;
        curColumn.fMaxIntSat = MAX_INT;
    }
    else if ( curColumn.typeName ==
              ColDataTypeStr[CalpontSystemCatalog::UINT] )
    {
        curColumn.fMinIntSat = MIN_UINT;
        curColumn.fMaxIntSat = MAX_UINT;
    }
    else if ( curColumn.typeName ==
              ColDataTypeStr[CalpontSystemCatalog::BIGINT] )
    {
        curColumn.fMinIntSat = MIN_BIGINT;
        curColumn.fMaxIntSat = MAX_BIGINT;
    }
    else if ( curColumn.typeName ==
              ColDataTypeStr[CalpontSystemCatalog::UBIGINT] )
    {
        curColumn.fMinIntSat = MIN_UBIGINT;
        curColumn.fMaxIntSat = MAX_UBIGINT;
    }
    else if ( curColumn.typeName ==
              ColDataTypeStr[CalpontSystemCatalog::SMALLINT] )
    {
        curColumn.fMinIntSat = MIN_SMALLINT;
        curColumn.fMaxIntSat = MAX_SMALLINT;
    }
    else if ( curColumn.typeName ==
              ColDataTypeStr[CalpontSystemCatalog::USMALLINT] )
    {
        curColumn.fMinIntSat = MIN_USMALLINT;
        curColumn.fMaxIntSat = MAX_USMALLINT;
    }
    else if ( curColumn.typeName ==
              ColDataTypeStr[CalpontSystemCatalog::TINYINT] )
    {
        curColumn.fMinIntSat = MIN_TINYINT;
        curColumn.fMaxIntSat = MAX_TINYINT;
    }
    else if ( curColumn.typeName ==
              ColDataTypeStr[CalpontSystemCatalog::UTINYINT] )
    {
        curColumn.fMinIntSat = MIN_UTINYINT;
        curColumn.fMaxIntSat = MAX_UTINYINT;
    }
    else if ( curColumn.typeName ==
              ColDataTypeStr[CalpontSystemCatalog::DECIMAL] )
    {
        curColumn.fMinIntSat = -infinidb_precision[curColumn.precision];
        curColumn.fMaxIntSat = infinidb_precision[curColumn.precision];
    }
    else if ( curColumn.typeName ==
              ColDataTypeStr[CalpontSystemCatalog::UDECIMAL] )
    {
        curColumn.fMinIntSat = 0;
        curColumn.fMaxIntSat = infinidb_precision[curColumn.precision];
    }
    else if ( curColumn.typeName ==
              ColDataTypeStr[CalpontSystemCatalog::FLOAT] )
    {
        curColumn.fMinDblSat = MIN_FLOAT;
        curColumn.fMaxDblSat = MAX_FLOAT;
    }
    else if ( curColumn.typeName ==
              ColDataTypeStr[CalpontSystemCatalog::UFLOAT] )
    {
        curColumn.fMinDblSat = 0.0;
        curColumn.fMaxDblSat = MAX_FLOAT;
    }
    else if ( curColumn.typeName ==
              ColDataTypeStr[CalpontSystemCatalog::DOUBLE] )
    {
        curColumn.fMinDblSat = MIN_DOUBLE;
        curColumn.fMaxDblSat = MAX_DOUBLE;
    }
    else if ( curColumn.typeName ==
              ColDataTypeStr[CalpontSystemCatalog::UDOUBLE] )
    {
        curColumn.fMinDblSat = 0.0;
        curColumn.fMaxDblSat = MAX_DOUBLE;
    }
}

//------------------------------------------------------------------------------
// Set Read Buffers attributes
// pNode - current node
//------------------------------------------------------------------------------
void XMLJob::setReadBuffers( xmlNode* pNode )
{
    int intVal = 0;

    if (getNodeAttribute(pNode,
                         xmlTagTable[TAG_NO_OF_READ_BUFFERS],
                         &intVal,
                         TYPE_INT ))
        fJob.numberOfReadBuffers = intVal;

    if (getNodeAttribute(pNode,
                         xmlTagTable[TAG_READ_BUFFER_SIZE],
                         &intVal,
                         TYPE_INT ))
        fJob.readBufferSize = intVal;
}

//------------------------------------------------------------------------------
// Set Schema attributes
// pNode - current node
//------------------------------------------------------------------------------
void XMLJob::setSchema( xmlNode* pNode )
{
    std::string bufString;

    if ( getNodeAttributeStr( pNode,
                              xmlTagTable[TAG_SCHEMA_NAME],
                              bufString ) )
        fJob.schema = bufString;
}

//------------------------------------------------------------------------------
// Transfer any/all <DefaultColumn> columns from temporary fDefaultColumns, to
// the end of the column/field lists.
// It is assumed that we are working with the last table in jobTableList.
// Then get additional information from system catalog to finish populating
// our Job structs with all the table and column attributes we need.
//------------------------------------------------------------------------------
void XMLJob::postProcessTableNode()
{
    bool bValidateNoDefColWithoutDefValue = false;

    if (fDefaultColumns.size() > 0)
    {
        bValidateNoDefColWithoutDefValue = true;
        int tableNo = fJob.jobTableList.size() - 1;

        for (unsigned k = 0; k < fDefaultColumns.size(); k++)
        {
            // Add to list of db columns to be loaded
            fJob.jobTableList[tableNo].colList.push_back( fDefaultColumns[k] );

            // Add to combined list of columns and ignored fields
            JobFieldRef fieldRef( BULK_FLDCOL_COLUMN_DEFAULT,
                                  fJob.jobTableList[tableNo].colList.size() - 1 );
            fJob.jobTableList[tableNo].fFldRefs.push_back( fieldRef );
        }

        fDefaultColumns.clear();
    }

    // Supplement xml file contents with information from syscat
    execplan::CalpontSystemCatalog::RIDList colRidList;
    fillInXMLDataAsLoaded( colRidList );

    // After getting all the system catalog information...
    // Validate that if there are any <DefaultColumn> tags for a NotNull
    // column, that the column is defined as NotNull With Default.
    if (bValidateNoDefColWithoutDefValue)
    {
        int tableNo = fJob.jobTableList.size() - 1;

        for (unsigned int iCol = 0;
                iCol < fJob.jobTableList[tableNo].colList.size(); iCol++)
        {
            JobColumn& col = fJob.jobTableList[tableNo].colList[iCol];

            if (col.fFldColRelation == BULK_FLDCOL_COLUMN_DEFAULT)
            {
                if ( (col.fNotNull) && (!col.fWithDefault) )
                {
                    std::ostringstream oss;
                    oss << "Column " << col.colName << " in table " <<
                        fJob.jobTableList[tableNo].tblName << " is NotNull "
                        "w/o default; cannot be used with <DefaultColumn>";
                    throw std::runtime_error( oss.str() );
                }
            }
        }
    }

    // Make sure all Columns in the DB are counted for with <Column> or
    // <DefaultColumn> tags (unless validate is disabled)
    if (fValidateColList)
        validateAllColumnsHaveTags( colRidList );
}

//------------------------------------------------------------------------------
// Use the table and column names from the last <Table> just loaded, to
// collect the remaining information from the system catalog, in order to
// populate the JobColumn structure.
//------------------------------------------------------------------------------
void XMLJob::fillInXMLDataAsLoaded(
    execplan::CalpontSystemCatalog::RIDList& colRidList)
{
    boost::shared_ptr<execplan::CalpontSystemCatalog> cat =
        execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(
            BULK_SYSCAT_SESSION_ID);
    cat->identity(execplan::CalpontSystemCatalog::EC);

    // Get the table and column attributes for the last <Table> processed
    unsigned int iTbl = fJob.jobTableList.size() - 1;
    JobTable& tbl = fJob.jobTableList[iTbl];

    std::string tblName;
    string::size_type startName = tbl.tblName.rfind('.');

    if (startName == string::npos)
        tblName.assign( tbl.tblName );
    else
        tblName.assign( tbl.tblName.substr(startName + 1) );

    execplan::CalpontSystemCatalog::TableName table(
        fJob.schema, tblName );

    if (fJob.jobTableList[iTbl].mapOid == 0)
    {
        execplan::CalpontSystemCatalog::OID tblOid =
            cat->tableRID(table).objnum;
        tbl.mapOid = tblOid;
    }

    // This call is made to improve performance.
    // The call forces all the column information for this table to be
    // cached at one time, instead of doing it piece-meal through repeated
    // calls to lookupOID().
    colRidList = cat->columnRIDs(table, true);

    // Loop through the columns to get the column attributes
    for (unsigned int iCol = 0;
            iCol < fJob.jobTableList[iTbl].colList.size(); iCol++)
    {
        JobColumn& col = fJob.jobTableList[iTbl].colList[iCol];

        if (col.mapOid == 0)
        {
            execplan::CalpontSystemCatalog::TableColName column;
            column.schema = fJob.schema;
            column.table  = tblName;
            column.column = col.colName;
            execplan::CalpontSystemCatalog::OID colOid =
                cat->lookupOID( column );

            if (colOid < 0)
            {
                ostringstream oss;
                oss << "Column OID lookup failed for: " << column;
                throw runtime_error( oss.str() );
            }

            col.mapOid    = colOid;

            execplan::CalpontSystemCatalog::ColType colType =
                cat->colType( col.mapOid );

            col.width                   = colType.colWidth;
            col.definedWidth            = colType.colWidth;

            if ((colType.scale > 0) ||
                    (colType.colDataType ==
                     execplan::CalpontSystemCatalog::DECIMAL) ||
                    (colType.colDataType ==
                     execplan::CalpontSystemCatalog::UDECIMAL))
            {
                col.precision           = colType.precision;
                col.scale               = colType.scale;
            }

            col.typeName                = ColDataTypeStr[colType.colDataType];
            col.compressionType         = colType.compressionType;
            col.dctnry.fCompressionType = colType.compressionType;

            if (colType.autoincrement)
                col.autoIncFlag         = true;
            else
                col.autoIncFlag         = false;

            // Initialize NotNull and Default Value (based on data type)
            fillInXMLDataNotNullDefault( tbl.tblName, colType, col );

            if (colType.ddn.dictOID > 0)
            {
                col.colType             = 'D';
                col.dctnryWidth         = colType.colWidth;
                col.dctnry.dctnryOid    = colType.ddn.dictOID;
            }

            // @bug3801: For backwards compatability, we treat
            // integer types with nonzero 0 scale as decimal if scale > 0
            if ( ((col.typeName ==
                    ColDataTypeStr[CalpontSystemCatalog::INT])      ||
                    (col.typeName ==
                     ColDataTypeStr[CalpontSystemCatalog::BIGINT])   ||
                    (col.typeName ==
                     ColDataTypeStr[CalpontSystemCatalog::SMALLINT]) ||
                    (col.typeName ==
                     ColDataTypeStr[CalpontSystemCatalog::TINYINT])) &&
                    (col.scale > 0) )
            {
                col.typeName = ColDataTypeStr[CalpontSystemCatalog::DECIMAL];
            }

            // Initialize the saturation limits for this column
            initSatLimits( col );
        }
    } // end of loop through columns
}

//------------------------------------------------------------------------------
// Using information from the system catalog (in colType), fill in the
// applicable NotNull Default values into the specified JobColumn.
//------------------------------------------------------------------------------
void XMLJob::fillInXMLDataNotNullDefault(
    const std::string& fullTblName,
    execplan::CalpontSystemCatalog::ColType& colType,
    JobColumn& col )
{
    const std::string col_defaultValue(colType.defaultValue);

    if (colType.constraintType ==
            execplan::CalpontSystemCatalog::NOTNULL_CONSTRAINT)
    {
        col.fNotNull            = true;

        if (!col_defaultValue.empty())
            col.fWithDefault    = true;
    }
    else if (colType.constraintType ==
             execplan::CalpontSystemCatalog::DEFAULT_CONSTRAINT)
    {
        col.fWithDefault        = true;
    }

    if (col.fWithDefault)
    {
        bool bDefaultConvertError = false;

        // Convert Default Value.
        // We go ahead and report basic format conversion error;
        // but we don't do complete validation (like checking to see
        // if the default is too large for the given integer type),
        // because we assume DDL is fully validating the default value.
        switch (colType.colDataType)
        {
            case execplan::CalpontSystemCatalog::BIT:
            case execplan::CalpontSystemCatalog::TINYINT:
            case execplan::CalpontSystemCatalog::SMALLINT:
            case execplan::CalpontSystemCatalog::MEDINT:
            case execplan::CalpontSystemCatalog::INT:
            case execplan::CalpontSystemCatalog::BIGINT:
            {
                errno = 0;
                col.fDefaultInt = strtoll(col_defaultValue.c_str(), 0, 10);

                if (errno == ERANGE)
                    bDefaultConvertError = true;

                break;
            }

            case execplan::CalpontSystemCatalog::UTINYINT:
            case execplan::CalpontSystemCatalog::USMALLINT:
            case execplan::CalpontSystemCatalog::UMEDINT:
            case execplan::CalpontSystemCatalog::UINT:
            case execplan::CalpontSystemCatalog::UBIGINT:
            {
                errno = 0;
                col.fDefaultUInt = strtoull(col_defaultValue.c_str(), 0, 10);

                if (errno == ERANGE)
                    bDefaultConvertError = true;

                break;
            }

            case execplan::CalpontSystemCatalog::DECIMAL:
            case execplan::CalpontSystemCatalog::UDECIMAL:
            {
                col.fDefaultInt = Convertor::convertDecimalString(
                                      col_defaultValue.c_str(),
                                      col_defaultValue.length(),
                                      colType.scale);

                if (errno == ERANGE)
                    bDefaultConvertError = true;

                break;
            }

            case execplan::CalpontSystemCatalog::DATE:
            {
                int convertStatus;
                int32_t dt =
                    dataconvert::DataConvert::convertColumnDate(
                        col_defaultValue.c_str(),
                        dataconvert::CALPONTDATE_ENUM, convertStatus,
                        col_defaultValue.length() );

                if (convertStatus != 0)
                    bDefaultConvertError = true;

                col.fDefaultInt = dt;
                break;
            }

            case execplan::CalpontSystemCatalog::DATETIME:
            {
                int convertStatus;
                int64_t dt =
                    dataconvert::DataConvert::convertColumnDatetime(
                        col_defaultValue.c_str(),
                        dataconvert::CALPONTDATETIME_ENUM, convertStatus,
                        col_defaultValue.length() );

                if (convertStatus != 0)
                    bDefaultConvertError = true;

                col.fDefaultInt = dt;
                break;
            }

            case execplan::CalpontSystemCatalog::TIME:
            {
                int convertStatus;
                int64_t dt =
                    dataconvert::DataConvert::convertColumnTime(
                        col_defaultValue.c_str(),
                        dataconvert::CALPONTTIME_ENUM, convertStatus,
                        col_defaultValue.length() );

                if (convertStatus != 0)
                    bDefaultConvertError = true;

                col.fDefaultInt = dt;
                break;
            }

            case execplan::CalpontSystemCatalog::FLOAT:
            case execplan::CalpontSystemCatalog::DOUBLE:
            case execplan::CalpontSystemCatalog::UFLOAT:
            case execplan::CalpontSystemCatalog::UDOUBLE:
            {
                errno = 0;
                col.fDefaultDbl = strtod(col_defaultValue.c_str(), 0);

                if (errno == ERANGE)
                    bDefaultConvertError = true;

                break;
            }

            default:
            {
                col.fDefaultChr = col_defaultValue;
                break;
            }
        }

        if (bDefaultConvertError)
        {
            std::ostringstream oss;
            oss << "Column " << col.colName << " in table " << fullTblName <<
                " has an invalid default value in system catalog.";
            throw std::runtime_error( oss.str() );
        }
    }
}

//------------------------------------------------------------------------------
// Use the table and column names from the last <Table> just loaded, to
// validate that all the columns have a <Column> or <DefaultColumn> tag
// present in the job XML file.
//------------------------------------------------------------------------------
void XMLJob::validateAllColumnsHaveTags(
    const execplan::CalpontSystemCatalog::RIDList& colRidList) const
{
    // Validate column list for the last <Table> processed
    unsigned int iTbl = fJob.jobTableList.size() - 1;
    const JobTable& tbl = fJob.jobTableList[iTbl];

    std::string tblName;
    string::size_type startName = tbl.tblName.rfind('.');

    if (startName == string::npos)
        tblName.assign( tbl.tblName );
    else
        tblName.assign( tbl.tblName.substr(startName + 1) );

    try
    {
        // Loop through column tags, saving col OIDs to a std::set for lookups
        std::set<execplan::CalpontSystemCatalog::OID> colOIDList;
        typedef std::set<execplan::CalpontSystemCatalog::OID>::iterator SetIter;
        std::pair<SetIter, bool> retVal;

        for (unsigned int iCol = 0;
                iCol < fJob.jobTableList[iTbl].colList.size(); iCol++)
        {
            const JobColumn& col = fJob.jobTableList[iTbl].colList[iCol];
            retVal = colOIDList.insert( col.mapOid );

            if (!retVal.second)
            {
                boost::shared_ptr<execplan::CalpontSystemCatalog> cat =
                    execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(
                        BULK_SYSCAT_SESSION_ID);
                cat->identity(execplan::CalpontSystemCatalog::EC);

                execplan::CalpontSystemCatalog::TableColName dbColName =
                    cat->colName( col.mapOid );
                std::ostringstream oss;
                oss << "Column " << dbColName.column << " referenced in Job XML"
                    " file more than once.";
                throw std::runtime_error( oss.str() );
            }
        }

        SetIter pos;

        // Loop thru cols in system catalog and verify that each one has a tag
        execplan::CalpontSystemCatalog::RIDList::const_iterator rid_iterator =
            colRidList.begin();

        while (rid_iterator != colRidList.end())
        {
            pos = colOIDList.find( rid_iterator->objnum );

            if (pos != colOIDList.end())
            {
                colOIDList.erase( pos ); // through with this column, so delete
            }
            else
            {
                boost::shared_ptr<execplan::CalpontSystemCatalog> cat =
                    execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(
                        BULK_SYSCAT_SESSION_ID);
                cat->identity(execplan::CalpontSystemCatalog::EC);

                execplan::CalpontSystemCatalog::TableColName dbColName =
                    cat->colName( rid_iterator->objnum );
                std::ostringstream oss;
                oss << "No tag present in Job XML file for DB column: " <<
                    dbColName.column;
                throw std::runtime_error( oss.str() );
            }

            ++rid_iterator;
        }
    }
    catch (std::exception& ex)
    {
        std::ostringstream oss;
        oss << "Error validating column list for table " <<
            fJob.schema << '.' << tblName << "; " << ex.what();
        throw std::runtime_error( oss.str() );
    }
    catch (...)
    {
        std::ostringstream oss;
        oss << "Unknown Error validating column list for table " <<
            fJob.schema << '.' << tblName;
        throw std::runtime_error( oss.str() );
    }
}

//------------------------------------------------------------------------------
// Generate a permanent or temporary Job XML file name path.
// sXMLJobDir Command line override for complete Job directory path
// jobDIr     Job subdirectory under default <BulkRoot> path
// jobId      Job ID
// bTempFile  Are we creating a temporary Job Xml File
// schmaName  If temp file, this is schema name to use
// tableName  If temp file, this is the table name to use
// xmlDirPath The complete Job XML file path that is constructed
// errMsg     Relevant error message if return value is not NO_ERROR.
//------------------------------------------------------------------------------
/* static */
int XMLJob::genJobXMLFileName(
    const string& sXMLJobDir,
    const string& jobDir,
    const string& jobId,
    bool          bTempFile,
    const string& schemaName,
    const string& tableName,
    boost::filesystem::path& xmlFilePath,
    string& errMsg,
    std::string&	   tableOIDStr )
{
    // get full file directory path for XML job description file
    if (sXMLJobDir.empty())
    {
        xmlFilePath  = Config::getBulkRoot();
        xmlFilePath /= jobDir;
    }
    else
    {
        xmlFilePath = sXMLJobDir;

        //If filespec doesn't begin with a '/' (i.e. it's not an absolute path),
        // attempt to make it absolute so that we can log the full pathname.
        if (!xmlFilePath.has_root_path())
        {
#ifdef _MSC_VER
            // nothing else to do
#else
            char cwdPath[4096];
            getcwd(cwdPath, sizeof(cwdPath));
            string trailingPath(xmlFilePath.string());
            xmlFilePath  = cwdPath;
            xmlFilePath /= trailingPath;
#endif
        }
    }

    // Append the file name to the directory path
    string jobFileName;

    if (bTempFile)
    {
        // Create tmp directory if does not exist
        RETURN_ON_ERROR( createTempJobDir( xmlFilePath.string(), errMsg ) );
        jobFileName += tableOIDStr;
        //jobFileName += schemaName;
        // jobFileName += '_';
        // jobFileName += tableName;
        jobFileName += "_D";

        string now(boost::posix_time::to_iso_string( boost::posix_time::second_clock::local_time()));

        // microseconds
        struct timeval tp;
        gettimeofday(&tp, 0);
        ostringstream usec;
        usec << setfill('0') << setw(6) << tp.tv_usec;

        jobFileName += now.substr(0, 8);
        jobFileName += "_T";
        jobFileName += now.substr(9, 6);
        jobFileName += "_S";
        jobFileName += usec.str();
        jobFileName += '_';
    }

    jobFileName += "Job_";
    jobFileName += jobId;
    jobFileName += ".xml";

    xmlFilePath /= jobFileName;

    return NO_ERROR;
}

//------------------------------------------------------------------------------
// Create directory for temporary XML job description files.
// OAM restart should delete any/all files in this directory.
//------------------------------------------------------------------------------
/* static */
int XMLJob::createTempJobDir( const string& xmlFilePath,
                              string& errMsg )
{
    boost::filesystem::path pathDir(xmlFilePath);

    // create temp directory for XML job file if it does not exist
    try
    {
        if ( !boost::filesystem::exists( xmlFilePath ) )
        {
            string boostErrString;

            try
            {
                boost::filesystem::create_directories(pathDir);
            }
            catch (exception& ex)
            {
                // ignore exception for now; we may have just had a
                // race condition where 2 jobs were creating dirs.
                boostErrString = ex.what();
            }

            if ( !boost::filesystem::exists( xmlFilePath ) )
            {
                ostringstream oss;
                oss << "Error creating XML temp job file directory(1) " <<
                    xmlFilePath << "; " << boostErrString;
                errMsg = oss.str();

                return ERR_DIR_CREATE;
            }
        }
    }
    catch (exception& ex)
    {
        ostringstream oss;
        oss << "Error creating XML temp job file directory(2) " <<
            xmlFilePath << "; " << ex.what();
        errMsg = oss.str();

        return ERR_DIR_CREATE;
    }

    if (!boost::filesystem::is_directory(pathDir) )
    {
        ostringstream oss;
        oss << "Error creating XML temp job file directory " <<
            xmlFilePath << "; path already exists as non-directory" << endl;
        errMsg = oss.str();

        return ERR_DIR_CREATE;
    }

    return NO_ERROR;
}

} //end of namespace

